---
layout: post
date: 2024-10-17
title: "TCP Server in Zig - Part 5b - Poll "
description: "Using non-blocking sockets and poll to improve the scalability of our system."
tags: [zig]
---

<p>In the previous part we introduced non-blocking sockets and used them, along with the <code>poll</code> system call, to maximize the efficiency of our server. Rather than having a thread-per-connection, waiting on data, a single thread can now manage multiple client connections. But this performance leap doesn't come for free: our code has gotten more complex.</p>

<p>At a high level, I think the idea behind evented I/O is straightforward. We ask the operating system to monitor a list of sockets, and it notifies us when those sockets are ready. We'll soon switch to alternatives to <code>poll</code> which perform better, have nicer APIs and are more powerful (at the cost of more complexity), but despite polls awkwardness we've managed to create a relatively clean TCP server implementation.</p>

<p>Of course, we're also missing a number of important features. Our last working example crashes if too many clients try to connect, it doesn't write to the client and it doesn't implement any timeouts. We still have a lot of work to do if we want something approaching production-ready.</p>

<h3 id=connection_limit><a href="#connection_limit" aria-hidden=true>Connection Limit</a></h3>
<p>The most glaring bug is our server crashes if too many simultaneous clients try to connect. This happens because, on a new connection, we don't do any bound-check on the <code>clients</code> and <code>client_polls</code> slices. Exactly how we fix this will be app-dependent. Maybe you want to allow new connections by disconnecting old ones. Maybe you want to disallow the connection but write an error message. We'll do something simpler: only accept a connection if we have an available slot.</p>

<p>One way to stop accepting connections is to remove the listener's <code>pollfd</code> from our <code>polls</code> slice. But since we'll want to re-enable this notification once free slots become available, a better way is to modify the existing entry:</p>

{% highlight zig %}
fn accept(self: *Server, listener: posix.socket_t) !void {
    const available = self.client_polls.len - self.connected;
    for (0..available) |_| {
        // ...
        // all the same as before
        // we accept the connection, create a client and store it the clients slice
        // add a pollfd to our client_polls
        // and increment self.connected by one
        // ...
    } else {
        // polls[0] is _always_ the listening socket
        self.polls[0].events = 0;
    }
}
{% endhighlight %}

<p>In addition to the original code which accepts connections until <code>accept</code> returns <code>error.WouldBlock</code>, we now also limit the number of connections to the <code>available</code> space we have. Once we've reached the available space - the <code>else</code> on a <code>for</code> only executes when the loop naturally reaches its end and not on a break/return - we disable the notification on our listening socket. Here we disable it by setting <code>events</code> to <code>0</code>. Another option is to negate the file descriptor which <code>poll</code> would then ignore. This has the benefits of preserving the <code>events</code> value for when we re-enable it by negating it back to a positive value.</p>

<p>All that's left to do is re-enable the monitor when space becomes available:</p>

{% highlight zig %}
fn removeClient(self: *Server, at: usize) void {
    // ...
    // everything else is unchanged ...
    // just need to add this somewhere
    // ...
    self.polls[0].events = posix.POLL.IN;
}
{% endhighlight %}

<p>Just like that, we've fixed a bad bug. More importantly, we've seen how we can modify a <code>pollfd</code> value. As above, this can be used to disable and enable monitors, but it can also be used to alternate between monitoring for read-readiness and write-readiness.</p>

<h3 id=writes><a href="#writes" aria-hidden=true>Writes</a></h3>
<p>When a client connects we monitor it by adding a new <code>pollfd</code> entry to <code>client_polls</code> and, through the <code>events</code> field, we register our interest in <code>posix.POLL.IN</code>. What would happen if we also registered our interest in <code>posix.POLL.OUT</code>:</p>

{% highlight zig %}
self.client_polls[connected] = .{
    .fd = socket,
    .revents = 0,
    // added: | posix.POLL.OUT
    .events = posix.POLL.IN | posix.POLL.OUT,
};
{% endhighlight %}

<p>This is almost certainly <strong>not</strong> what you want to do because the socket is almost always ready to be written to. If you make this change, run the code and connect a client, <code>poll</code> will constantly fire because <code>revents & posix.POLL.OUT == posix.POLL.OUT</code> will be true. What we need to do is only register our interest in this event only when we have something to write.</p>

<p>As we've said before, as common as it is for <code>read</code> to read a number of bytes completely unrelated to any application-specific messages, it's equally common for <code>write</code> to succeed in a single call. Despite this, correctly writing using non-blocking I/O is much more nuanced than reading. I've seen many simple implementation assume that write doesn't fail and will not do partial write. This is a dangerous assumption to make.</p>

<p>One challenge we have is that different application have different requirements. Are multiple threads allowed to write to a socket? Are we implementing a strict request -> response flow or can there be multiple incoming messages? Just like we potentially have to <code>poll</code> and <code>read</code> multiple times before getting a whole message, so too might we have to <code>poll</code> and <code>write</code> before sending a whole message - this means that the lifetime of those bytes might have to outlive an application's call to <code>writeMessage</code>.  Furthermore, in Part 3, we introduced <a href=/TCP-Server-In-Zig-Part-3-Minimizing-Writes-and-Reads/#writev>vectored I/O (writev)</a> as an optimization, but now that we have to make our write stateful, it's another small complexity to worry about.</p>

<p>To make our life easier, we're going to assume we want to implement a request -> response flow and we'll revert to using <code>write</code> rather than <code>writev</code>. Because <code>write</code> might return <code>error.WouldBlock</code> before writing our whole message, we need to add more state to our <code>Client</code>:</p>

{% highlight zig %}
const Client = struct {
    reader: Reader,
    socket: posix.socket_t,
    address: std.net.Address,

    // added these two fields
    to_write: []u8,
    write_buf: []u8,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        // create write_buf
        const write_buf = try allocator.alloc(u8, 4096);
        errdefer allocator.free(write_buf);

        return .{
            .reader = reader,
            .socket = socket,
            .address = address,
            .to_write = &.{},
            .write_buf = write_buf,
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
        // write_buf now needs to be freed
        allocator.free(self.write_buf);
    }
    ...
};
{% endhighlight %}

<p>We've added and initialized a <code>write_buf</code> field. When <code>writeMessage</code> is called, we'll copy the bytes (along with the length prefix) here. <code>to_write</code> is a slice of <code>write_buf</code> which represents the bytes we still need to write. You'll often see implementations add a <code>mode</code> to reflect whether or not the client is currently reading or writing data. For now, we'll just use <code>to_write</code> - when <code>to_write.len == 0</code>, it means we're reading (or waiting for) a message.</p>

<p><code>writeMessage</code> must change to copy the message into <code>write_buf</code>:</p>

{% highlight zig %}
fn writeMessage(self: *Client, msg: []const u8) !bool {
    if (self.to_write.len > 0) {
        // Depending on how you structure your code, this might not be possible
        // For example, in an HTTP server, the application might not control
        // the actual "writeMessage" call, and thus it would not be possible
        // to have more than one writeMessage per request
        return error.PendingMessage;
    }

    if (msg.len + 4 > self.write_buf.len) {
        // Could allocate a dynamic buffer. Could use a large buffer pool.
        return error.MessageTooLarge;
    }

    // copy our length prefix + message to our write buffer
    std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);

    // copy the message to our write buffer
    const end = msg.len + 4;
    @memcpy(self.write_buf[4..end], msg);

    // setup our to_write slice
    self.to_write = self.write_buf[0..end];

    // immediately write what we can
    return self.write();
}

// Returns `false` if we didn't manage to write the whole mssage
// Returns `true` if the message is fully written
fn write(self: *Client) !bool {
    var buf = self.to_write;

    // when this function exits, we'll store whatever isn't written back into
    // self.to_write. If we wrote everything, than this will be an empty
    // slice (which is what we want)
    defer self.to_write = buf;

    while (buf.len > 0) {
        const n = posix.write(self.socket, buf) catch |err| switch (err) {
            error.WouldBlock => return false,
            else => return err,
        };

        // As long as buf.len > 0, I don't *think* write can ever return 0.
        // But I'm not sure either.
        if (n == 0) {
            return error.Closed;
        }

        // this is what we still have to write
        buf = buf[n..];
    } else {
        return true;
    }
}
{% endhighlight %}

<p><code>writeMessage</code> creates and stores the prefixed-length message in our new <code>write_buf</code>, and then calls <code>write</code> to write as much of the message as possible. For its part, <code>write</code> writes as much of our unsent bytes, stored in <code>to_write</code>. Crucially, it returns <code>false</code> if the write is incomplete, and <code>true</code> otherwise. This is used by our <code>run</code> function to switch the socket between read and write mode. Here's the modified poll loop within our server's <code>run</code> method. Only a handful of lines, near the end, have changed:</p>

{% highlight zig %}
while (i < self.connected) {
    const revents = self.client_polls[i].revents;
    if (revents == 0) {
        i += 1;
        continue;
    }

    var client = &self.clients[i];
    if (revents & posix.POLL.IN == posix.POLL.IN) {
        while (true) {
            const msg = client.readMessage() catch {
                self.removeClient(i);
                break;
            } orelse {
                i += 1;
                break;
            };

            // We now echo the message back to the user
            const written = client.writeMessage(msg) catch {
                self.removeClient(i);
                break;
            };
            // If writeMessage didn't fully write the message, we change to
            // write-mode, asking to be notified of the socket's write-readiness
            // instead of its read-readiness.
            if (written == false) {
                self.client_polls[i].events = posix.POLL.OUT;
                break;
            }
            // else, the entire message was written, we stay in read-mode
            // and see if the client has another message ready
        }
    } else if (revents & posix.POLL.OUT == posix.POLL.OUT) {
        // This whole block is new. This means that socket was previously put
        // into write-mode and that it is now ready. We write what we can.
        const written = client.write() catch {
            self.removeClient(i);
            continue;
        };
        if (written) {
            // and if the entire message was written, we revert to read-mode.
            self.client_polls[i].events = posix.POLL.IN;
        }
    }
}
{% endhighlight %}

<p>We now attempt to write the message back to the client. If <code>writeMessage</code> is able to immediately write the entire message (if it returns <code>true</code>), then nothing changes - we go back to trying to read more messages from the cilent. If <code>writeMessage</code> returns <code>false</code>, then our message was not fully written and we switch to "write-mode", which is to say, we stop monitoring <code>POLL.IN</code> and start monitoring <code>POLL.OUT</code>.</p>

<p>In this implementation, we're either monitoring <code>POLL.IN</code> or <code>POLL.OUT</code>, never both. This is why we can use an <code>else if</code> to check if <code>revents</code> is signaling write-readiness. And, if it is, we try to write more data. Once all data is written, we can revert to monitoring <code>POLL.IN</code>.</p>

<p>It's possible to monitor both <code>POLL.IN</code> and <code>POLL.OUT</code> at the same time, but as we saw, we should only monitor <code>POLL.OUT</code> if we actually have something to write, else we'll get endless and pointless notifications. We could support multiple pending write message by appending new messages to <code>write_buf</code> and expanding <code>to_write</code> accordingly. Or, we could use an array or ArrayList of buffers - one per pending message.</p>

<p>As-is, our <code>Client</code> has no way to reach into the corresponding <code>pollfd</code>. That means that we can't expose the <code>readMessage</code> function for the application to call. Only our Server's <code>run</code> method can call <code>writeMessage</code> because only it can handle a partial write by changing the <code>pollfd</code>'s event to <code>POLL.OUT</code>. This is not an insurmountable problem - the client could hold a reference to the server as well as its <code>client_polls</code> index. However, this becomes much easier to solve using <code>epoll</code> and <code>kqueue</code>, so we'll leave it until then.</p>

<p>Finally, because <code>write</code> often succeeds in a singe call, there is an opportunity to optimize <code>writeMessage</code>. We could first try our vectored write first, avoiding having to copy bytes to our <code>write_buf</code>. However, <code>writev</code>, like <code>write</code>, might do a partial write and then return <code>error.WouldBlock</code>, in which case we'd need to only copy the unwritten bytes to <code>write_buf</code>. I'll leave this optimization to you.</p>

<h3 id=referencing_clients><a href="#tweaking_clients" aria-hidden=true>Referencing Clients</a></h3>
<p>We previously improved the organization of our code by introducing a <code>Server</code> structure and extracting various behavior into their own methods. We also introduced a <code>Client</code> to maintain state, such as our read and write buffers, associated with a socket. It isn't hard to imagine the <code>Client</code> becoming the main point of interaction with the rest of the application. As-is, that wouldn't be possible because the client doesn't have a fixed-location, i.e. it can't safely be referenced from outside the server. Because of the way we handle removal from the <code>clients</code> array, a <code>Client</code> instance doesn't have a stable address.</p>

<p>We'll fix this by allocating clients on the heap. To accommodate this change, we'll add a <a href=/Zig-MemoryPool-Allocator/>MemoryPool</a> to our server and change the type of value our <code>clients</code> array holds from <code>Client</code> to <code>*Client</code>:</p>

{% highlight zig %}
const Server = struct {
    client_pool: std.heap.MemoryPool(Client),
    // change: Client -> *Client
    clients: []*Client,
    // ...
};
{% endhighlight %}

<p>Our Server's <code>init</code> and <code>deinit</code> needs to be adjusted:</p>

{% highlight zig %}
fn init(allocator: Allocator, max: usize) !Server {
    const polls = try allocator.alloc(posix.pollfd, max + 1);
    errdefer allocator.free(polls);

    // changed Client to *Client
    const clients = try allocator.alloc(*Client, max);
    errdefer allocator.free(clients);

    return .{
        .polls = polls,
        .clients = clients,
        .client_polls = polls[1..],
        .connected = 0,
        .allocator = allocator,
        // added
        .client_pool = std.heap.MemoryPool(Client).init(allocator),
    };
}

fn deinit(self: *Server) void {
    self.allocator.free(self.polls);
    self.allocator.free(self.clients);
    // added
    self.client_pool.deinit();
}
{% endhighlight %}

<p>If you aren't familiar with Zig's <code>MemoryPool</code>, it's a specialized ArenaAllocator that can create a single type. It maintains a free-list of previously destroyed values for re-use, so subsequent calls to <code>create</code> can be very cheap.</p>

<p>We need to make three final changes. First, we need to change our <code>accept</code> method to use <code>client_pool</code> to create an instance:</p>

{% highlight zig %}
const client = try self.client_pool.create();
errdefer self.client_pool.destroy(client);
client.* = Client.init(self.allocator, socket, address) catch |err| {
    posix.close(socket);
    log.err("failed to initialize client: {}", .{err});
    return;
};
{% endhighlight %}

<p>Then, for cleanup, when <code>removeClient</code> is called, we need to destroy the client:</p>

{% highlight zig %}
fn removeClient(self: *Server, at: usize) void {
    var client = self.clients[at];
    defer self.client_pool.destroy(client);
    // ...
}
{% endhighlight %}

<p>Finally, in our <code>run</code> method, we were previously getting a reference to the <code>Client</code> stored in our array: <code>var client = &self.clients[i];</code>. We no longer have to dereference the array value since the value is already a pointer. Thus, the code becomes: <code>var client = self.clients[i];</code> (notice the &amp; is removed).</p>

<aside><p>As in previous parts, if you'd like to see these changes as part of the whole, you can <a href=#appendix-a>skip to the end</a> to see the complete runnable code.</p></aside>

<p>Allowing clients to be referenced is our first step in making <code>Client</code> a first-class citizen in our system. Next we'll look at  implementing a read timeout, and we'll now be able to safely reference a <code>Client</code>.</p>

<h3 id=timeout><a href="#timeout" aria-hidden=true>Timeouts</a></h3>
<p>In Part 1 we called <code>setsocketopt</code> with the <code>SO.RCVTIMEO</code> and <code>SO.SNDTIMEO</code> options to set a timeout on subsequent <code>read</code> and <code>write</code> operations. I wish I could tell you that you can use the same mechanism and, on timeout, <code>poll</code> will notify you. Unfortunately, that isn't the case. As far as I know, there isn't a built-in way to hook read/write timeouts with evented I/O. What we do have, is the ability to pass a timeout to <code>poll</code> itself. So far we've been passing a timeout of -1, which means <code>poll</code> block until there's at least one event.</p>

<p>When <code>poll</code> is given a timeout, as milliseconds, it'll return after the timeout expires even if no socket is ready. We need a way to find the client which is going to timeout next, and set <code>poll</code>'s timeout based on that. This is something that we'll need to do before every call to <code>poll</code>, so it seems like it'll be prohibitively expensive - looping through every client to find the one closest to timing out. But, there's a time and memory efficient data structure that's perfect for solving this problem: a doubly linked list.</p>

<p>Say we want to enforce an idle timeout of 5 minutes. If a client doesn't send a message within 5 minutes of connecting or within 5 minutes of their last message, they'll get disconnected. Initially we have no clients, so we have an empty linked list:</p>

{% highlight text %}
head -> null  null <- tail
{% endhighlight %}

<p>At this point, we can set a timeout of -1 (infinity). When the first client connects, we set its <code>read_timeout</code> field to <code>now + 60</code> and append it to linked list. In the name of readability, I'm going to use a timestamp to display the absolute timeout of a client:</p></p>

{% highlight text %}
head -> c1[to=13:00] <-tail
{% endhighlight %}

<p>Any subsequent client that connects will have a timeout further in the future than this client. If three more clients connect, we set their <code>read_timeout</code> field and can append them to our list:</p>

{% highlight text %}
head -> c1[to=13:00] <-> c2[to=13:02] <-> c3[to=13:02] <-> c4[to=13:05] <-tail
{% endhighlight %}

<p>Put differently, the oldest connection is the one that'll timeout soonest. Thus, by always appending new connections to the end of our list, we can traverse the list to find timed-out clients and stop iterating as soon as we find a client with a timeout in the future. This client will be the next to time-out. This also holds true after a message is received. Let's say that both c1 and c3 send us a message. All we need to do is move them to the end of our list:</p>

{% highlight text %}
head -> c2[to=13:02] <-> c4[to=13:05] <-> c1[to=13:07] <-> c3[to=13:07] <-tail
{% endhighlight %}

<p>You might be wondering: what if you want to have two separate timeouts. For example, you might want a short timeout for initial messages (maybe as part of an authentication flow) and then a much longer timeout for all subsequent messages. As with most problems in computer science, the solution is: add more linked lists! For each distinct timeout value, we need a distinct list. We can figure the next timeout value to pass to poll by taking the minimal next timeout value of all lists.</p>

<p>Most of this code has little to do with network programming - we're mostly just moving linked list nodes around. You can see the <a href=#appendix-a>full implementation</a> at the end. The way we enforce the timeout and get the timeout value to pass to <code>poll</code> is worth reviewing though:</p>

{% highlight zig %}
while (true) {
    const next_timeout = self.enforceTimeout();
    _ = try posix.poll(self.polls[0..self.connected + 1], next_timeout);

    // ...
}
{% endhighlight %}

<p>We no longer pass <code>-1</code> as the timeout to <code>poll</code>. The new <code>enforceTimeout</code>, which is always called before we poll, not only disconnects timed-out clients, but it also returns the time, in millisecond, that our next client will timeout at. This is the the maximum amount of time we can block on <code>poll</code>, and so it is our timeout:</p>

{% highlight zig %}
fn enforceTimeout(self: *Server) i32 {
    const now = std.time.milliTimestamp();

    var node = self.read_timeout_list.first;
    while (node) |n| {
        const client = n.data;
        const diff = client.read_timeout - now;
        if (diff > 0) {
            // this client's timeout is the first one that's in the
            // future, so we now know the maximum time we can block on
            // poll before having to call enforceTimeout again
            return @intCast(diff);
        }

        // This client's timeout is in the past, the client has timed-out.

        // Ideally, we'd call server.removeClient() and just remove the
        // client directly. But within this method, we don't know the
        // client_polls index. When we move to epoll / kqueue, this problem
        // will go away, since we won't have this awkwardness with polls,
        // client_polls and their shared index.
        posix.shutdown(client.socket, .recv) catch {};
        node = n.next;
    } else {
        // Either we have no client OR all clients have timed out. Poll can
        // block as long as it takes.
        return -1;
    }
}
{% endhighlight %}

<p>The <code>read_timeout</code> field given to each client is the absolute time that the client will timeout. It is set on connection and updated after each message is received, extending the timeout. Because our list keeps clients ordered by timeout, we can iterate through the list and disconnect any client timed-out clients. As soon as we find a client with a future timeout, we can return and use the amount of time until this client times-out as the timeout to pass to <code>poll</code>.</p>

<h3 id=conclusion><a href="#conclusion" aria-hidden=true>Conclusion</a></h3>
<p>The <code>poll</code> system call, while far from perfect, is a wonderful introduction to evented I/O. It teaches us the need to manage per-connection state (i.e. read and write buffers) so that we can respond to the OS' notification about the readiness of monitored sockets. With blocking I/O, the stream oriented nature of TCP is easy to gloss over - we can read in a loop until we have a message and write in a loop until we've written a message. With non-blocking sockets, it's something we have to put a lot more thought and care into. We still "loop" until our message is fully read or written, but that loop happens over a much wider expanse of code.</p>

<p>In the next part we'll look at <code>epoll</code>, a more powerful and better performing Linux-specific version of <code>poll</code>. Almost everything that we've learnt so far will be directly applicable to <code>epoll</code> as well as <code>kqueue</code> (our subject after <code>epoll</code>). One of the most annoying part of our implementation so far has been the index-sharing sync between <code>polls</code> and <code>client_polls</code>. In fairness to <code>poll</code>, using an <code>AutoHashMap</code> might have made our life easier. Still, I'm glad to say that both <code>epoll</code> and <code>kqueue</code> will allow us to clean up that mess!</p>

<div class=pager>
  <a class=prev href=/TCP-Server-In-Zig-Part-5a-Poll/>Poll (Part 1)</a>
  <a class=next href=/TCP-Server-In-Zig-Part-6-Epoll/>Epoll</a>
</div>

<h3 id=appendix-a><a href="#appendix-a" aria-hidden=true>Appendix A - Code</a></h3>
{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;

const log = std.log.scoped(.tcp_demo);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var server = try Server.init(allocator, 4096);
    defer server.deinit();

    const address = try std.net.Address.parseIp("127.0.0.1", 5882);
    try server.run(address);
    std.debug.print("STOPPED\n", .{});
}

// 1 minute
const READ_TIMEOUT_MS = 60_000;

const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;

const Server = struct {
    // creates our polls and clients slices and is passed to Client.init
    // for it to create our read buffer.
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    // polls[0] is always our listening socket
    polls: []posix.pollfd,

    // for creating client
    client_pool: std.heap.MemoryPool(Client),

    // list of clients, only client[0..connected] are valid
    clients: []*Client,

    // This is always polls[1..] and it's used to so that we can manipulate
    // clients and client_polls together. Necessary because polls[0] is the
    // listening socket, and we don't ever touch that.
    client_polls: []posix.pollfd,

    // clients ordered by when they will read-timeout
    read_timeout_list: ClientList,

    // for creating nodes for our read_timeout list
    client_node_pool: std.heap.MemoryPool(ClientList.Node),

    fn init(allocator: Allocator, max: usize) !Server {
        // + 1 for the listening socket
        const polls = try allocator.alloc(posix.pollfd, max + 1);
        errdefer allocator.free(polls);

        const clients = try allocator.alloc(*Client, max);
        errdefer allocator.free(clients);

        return .{
            .polls = polls,
            .clients = clients,
            .client_polls = polls[1..],
            .connected = 0,
            .allocator = allocator,
            .read_timeout_list = .{},
            .client_pool = std.heap.MemoryPool(Client).init(allocator),
            .client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
        };
    }

    fn deinit(self: *Server) void {
        self.allocator.free(self.polls);
        self.allocator.free(self.clients);
        self.client_pool.deinit();
        self.client_node_pool.deinit();
    }

    fn run(self: *Server, address: std.net.Address) !void {
        const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
        const protocol = posix.IPPROTO.TCP;
        const listener = try posix.socket(address.any.family, tpe, protocol);
        defer posix.close(listener);

        try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
        try posix.bind(listener, &address.any, address.getOsSockLen());
        try posix.listen(listener, 128);

        // One of the polling slots (the first one) is reserved for our listening
        // socket.
        self.polls[0] = .{
            .fd = listener,
            .revents = 0,
            .events = posix.POLL.IN,
        };

        var read_timeout_list = &self.read_timeout_list;

        while (true) {
            const next_timeout = self.enforceTimeout();
            _ = try posix.poll(self.polls[0..self.connected + 1], next_timeout);

            if (self.polls[0].revents != 0) {
                // listening socket is ready
                self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
            }

            var i: usize = 0;
            while (i < self.connected) {
                const revents = self.client_polls[i].revents;
                if (revents == 0) {
                    // this socket isn't ready, move on to the next one
                    i += 1;
                    continue;
                }

                var client = self.clients[i];
                if (revents & posix.POLL.IN == posix.POLL.IN) {
                    // this socket is ready to be read
                    while (true) {
                        const msg = client.readMessage() catch {
                            // we don't increment `i` when we remove the client
                            // because removeClient does a swap and puts the last
                            // client at position i
                            self.removeClient(i);
                            break;
                        } orelse {
                            // no more messages, but this client is still connected
                            i += 1;
                            break;
                        };

                        client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
                        read_timeout_list.remove(client.read_timeout_node);
                        read_timeout_list.append(client.read_timeout_node);

                        const written = client.writeMessage(msg) catch {
                            self.removeClient(i);
                            break;
                        };
                        if (written == false) {
                            self.client_polls[i].events = posix.POLL.OUT;
                            break;
                        }

                    }
                } else if (revents & posix.POLL.OUT == posix.POLL.OUT) {
                    const written = client.write() catch {
                        self.removeClient(i);
                        continue;
                    };
                    if (written) {
                        self.client_polls[i].events = posix.POLL.IN;
                    }
                }
            }
        }
    }

    fn enforceTimeout(self: *Server) i32 {
        const now = std.time.milliTimestamp();

        var node = self.read_timeout_list.first;
        while (node) |n| {
            const client = n.data;
            const diff = client.read_timeout - now;
            if (diff > 0) {
                // this client's timeout is the first one that's in the
                // future, so we now know the maximum time we can block on
                // poll before having to call enforceTimeout again
                return @intCast(diff);
            }

            // This client's timeout is in the past. Close the socket
            // Ideally, we'd call server.removeClient() and just remove the
            // client directly. But within this method, we don't know the
            // client_polls index. When we move to epoll / kqueue, this problem
            // will go away, since we won't need to maintain polls and client_polls
            // in sync by index.
            posix.shutdown(client.socket, .recv) catch {};
            node = n.next;
        } else {
            // We have no client that times out in the future (if we did
            // we would have hit the return above).
            return -1;
        }
    }

    fn accept(self: *Server, listener: posix.socket_t) !void {
        const space = self.client_polls.len - self.connected;
        for (0..space) |_| {
            var address: net.Address = undefined;
            var address_len: posix.socklen_t = @sizeOf(net.Address);
            const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
                error.WouldBlock => return,
                else => return err,
            };

            const client = try self.client_pool.create();
            errdefer self.client_pool.destroy(client);
            client.* = Client.init(self.allocator, socket, address) catch |err| {
                posix.close(socket);
                log.err("failed to initialize client: {}", .{err});
                return;
            };

            client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
            client.read_timeout_node = try self.client_node_pool.create();
            errdefer self.client_node_pool.destroy(client.read_timeout_node);

            client.read_timeout_node.* = .{
                .next = null,
                .prev = null,
                .data = client,
            };
            self.read_timeout_list.append(client.read_timeout_node);

            const connected = self.connected;
            self.clients[connected] = client;
            self.client_polls[connected] = .{
                .fd = socket,
                .revents = 0,
                .events = posix.POLL.IN,
            };
            self.connected = connected + 1;
        } else {
            self.polls[0].events = 0;
        }
    }

    fn removeClient(self: *Server, at: usize) void {
        var client = self.clients[at];
        defer {
            posix.close(client.socket);
            self.client_node_pool.destroy(client.read_timeout_node);
            client.deinit(self.allocator);
            self.client_pool.destroy(client);
        }

        // Swap the client we're removing with the last one
        // So that when we set connected -= 1, it'll effectively "remove"
        // the client from our slices.
        const last_index = self.connected - 1;
        self.clients[at] = self.clients[last_index];
        self.client_polls[at] = self.client_polls[last_index];
        self.connected = last_index;

        // Maybe the listener was disabled because we were full,
        // but now we have a free slot.
        self.polls[0].events = posix.POLL.IN;

        self.read_timeout_list.remove(client.read_timeout_node);
    }
};


const Client = struct {
    socket: posix.socket_t,
    address: std.net.Address,

    // Used to read length-prefixed messages
    reader: Reader,

    // Bytes we still need to send. This is a slice of `write_buf`. When
    // empty, then we're in "read-mode" and are waiting for a message from the
    // client.
    to_write: []u8,

    // Buffer for storing our lenght-prefixed messaged
    write_buf: []u8,

    // absolute time, in millisecond, when this client should timeout if
    // a message isn't received
    read_timeout: i64,

    // Node containing this client in the server's read_timeout_list
    read_timeout_node: *ClientNode,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        const write_buf = try allocator.alloc(u8, 4096);
        errdefer allocator.free(write_buf);

        return .{
            .reader = reader,
            .socket = socket,
            .address = address,
            .to_write = &.{},
            .write_buf = write_buf,
            .read_timeout = 0, // let the server set this
            .read_timeout_node = undefined, // hack/ugly, let the server set this when init returns
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
        allocator.free(self.write_buf);
    }

    fn readMessage(self: *Client) !?[]const u8 {
        return self.reader.readMessage(self.socket) catch |err| switch (err) {
            error.WouldBlock => return null,
            else => return err,
        };
    }

    fn writeMessage(self: *Client, msg: []const u8) !bool {
        if (self.to_write.len > 0) {
            // Depending on how you structure your code, this might not be possible
            // For example, in an HTTP server, the application might not control
            // the actual "writeMessage" call, and thus it would not be possible
            // to make more than one writeMessage call per request.
            // For this demo, we'll just return an error.
            return error.PendingMessage;
        }

        if (msg.len + 4 > self.write_buf.len) {
            // Could allocate a dynamic buffer. Could use a large buffer pool.
            return error.MessageTooLarge;
        }

        // copy our length prefix + message to our buffer
        std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
        const end = msg.len + 4;
        @memcpy(self.write_buf[4..end], msg);

        // setup our to_write slice
        self.to_write = self.write_buf[0..end];

        // immediately write what we can
        return self.write();
    }

    // Returns `false` if we didn't manage to write the whole mssage
    // Returns `true` if the message is fully written
    fn write(self: *Client) !bool {
        var buf = self.to_write;
        defer self.to_write = buf;
        while (buf.len > 0) {
            const n = posix.write(self.socket, buf) catch |err| switch (err) {
                error.WouldBlock => return false,
                else => return err,
            };

            if (n == 0) {
                return error.Closed;
            }
            buf = buf[n..];
        } else {
            return true;
        }
    }
};

const Reader = struct {
    buf: []u8,
    pos: usize = 0,
    start: usize = 0,

    fn init(allocator: Allocator, size: usize) !Reader {
        const buf = try allocator.alloc(u8, size);
        return .{
            .pos = 0,
            .start = 0,
            .buf = buf,
        };
    }

    fn deinit(self: *const Reader, allocator: Allocator) void {
        allocator.free(self.buf);
    }

    fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
        var buf = self.buf;

        while (true) {
            if (try self.bufferedMessage()) |msg| {
                return msg;
            }
            const pos = self.pos;
            const n = try posix.read(socket, buf[pos..]);
            if (n == 0) {
                return error.Closed;
            }
            self.pos = pos + n;
        }
    }

    fn bufferedMessage(self: *Reader) !?[]u8 {
        const buf = self.buf;
        const pos = self.pos;
        const start = self.start;

        std.debug.assert(pos >= start);
        const unprocessed = buf[start..pos];
        if (unprocessed.len < 4) {
            self.ensureSpace(4 - unprocessed.len) catch unreachable;
            return null;
        }

        const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);

        // the length of our message + the length of our prefix
        const total_len = message_len + 4;

        if (unprocessed.len < total_len) {
            try self.ensureSpace(total_len);
            return null;
        }

        self.start += total_len;
        return unprocessed[4..total_len];
    }

    fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
        const buf = self.buf;
        if (buf.len < space) {
            return error.BufferTooSmall;
        }

        const start = self.start;
        const spare = buf.len - start;
        if (spare >= space) {
            return;
        }

        const unprocessed = buf[start..self.pos];
        std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
        self.start = 0;
        self.pos = unprocessed.len;
    }
};
{% endhighlight %}
<div class=pager>
  <a class=prev href=/TCP-Server-In-Zig-Part-5a-Poll/>Poll (Part 1)</a>
  <a class=next href=/TCP-Server-In-Zig-Part-6-Epoll/>Epoll</a>
</div>
